package cn.doitedu.udfs;

import cn.doitedu.pojo.DynamicKeyedBean;
import cn.doitedu.pojo.LogBean;
import cn.doitedu.pojo.RuleCondition;
import cn.doitedu.pojo.RulesBean;
import cn.doitedu.utils.StateDescriptorUtils;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.lang.reflect.Field;
import java.util.*;

/**
 * 根据规则中配置的key，取出对应的值，然后数据输出数据
 */
public class DynamicKeyProcessFunction extends BroadcastProcessFunction<LogBean, RulesBean, DynamicKeyedBean> {

    //key对相同的key去重
    private Set<String> keys = new HashSet<>();

    @Override
    public void processElement(LogBean bean, ReadOnlyContext ctx, Collector<DynamicKeyedBean> out) throws Exception {

        for (String keyName : keys) {
            //key是key的名称
            //可以通过反射获取对应key名称的值
            Class<LogBean> clazz = LogBean.class;
            Field field = clazz.getDeclaredField(keyName);
            field.setAccessible(true); //暴力反射
            String keyValue = (String) field.get(bean);
            out.collect(new DynamicKeyedBean(keyName, keyValue, bean));
        }
    }

    @Override
    public void processBroadcastElement(RulesBean rulesBean, Context ctx, Collector<DynamicKeyedBean> out) throws Exception {

        int status = rulesBean.getRule_status();
        BroadcastState<Long, RulesBean> broadcastState = ctx.getBroadcastState(StateDescriptorUtils.rulesStateDescriptor);


        if (status == 1) {
            broadcastState.put(rulesBean.getId(), rulesBean);
            String json = rulesBean.getRule_condition_json();
            RuleCondition ruleCondition = JSON.parseObject(json, RuleCondition.class);
            //keys.addAll(Arrays.asList(ruleCondition.getKeyBy().split(",")));
            for (String key : ruleCondition.getKeyBy().split(",")) {
                keys.add(key);
            }
        } else {
            broadcastState.remove(rulesBean.getId());
        }

    }
}
